Python脚本:CSV 文件合并工具说明书

1. 简介

本工具是一个 Python 脚本,旨在帮助用户方便地将多个具有相同表头的 CSV (逗号分隔值) 文件合并成一个单一的 CSV 文件。它提供了一个交互式的界面,允许用户在运行时选择单个或多个 CSV 文件,或者选择一个包含多个 CSV 文件的文件夹进行合并。

主要功能:

2. 系统要求

3. 开始之前

4. 如何使用

步骤 1: 保存脚本

  1. 将提供的 Python 脚本代码复制到一个纯文本文件中。
  2. 将该文件保存为以 .py 结尾的文件,例如:interactive_merge_csv.py

步骤 2: 运行脚本

  1. 打开您的操作系统终端(在 macOS 或 Linux 上是 Terminal,在 Windows 上是命令提示符(Command Prompt)或 PowerShell)。
  2. 使用 cd 命令导航到您保存 interactive_merge_csv.py 脚本的目录。例如:
    cd /path/to/your/script_directory
    
    或者在 Windows 上:
    cd C:\path\to\your\script_directory
    
  3. 执行脚本:
    python3 interactive_merge_csv.py
    
    (如果 python3 命令无效,请尝试使用 python interactive_merge_csv.py)

步骤 3: 选择输入方式

脚本运行后,终端会首先提示您选择输入文件的方式:

你想选择 (F)单个或多个CSV文件 还是 (D)一个包含CSV文件的文件夹? [F/D]:

步骤 4: 选择文件或文件夹

根据您在步骤 3 中的选择,会弹出一个图形对话框:

如果您未选择任何文件/文件夹或关闭了对话框,脚本将在终端提示并退出。

步骤 5: 选择输出位置和文件名

如果成功选择了输入文件且文件初步检查(例如,文件夹模式下找到了 CSV 文件)通过,脚本会处理数据。在准备好写入合并数据之前,会弹出另一个对话框:

如果您在此步骤取消或关闭对话框,合并后的文件将不会被保存,脚本会提示操作已取消。

步骤 6: 查看结果

5. 功能详解

6. 故障排除

7. 注意事项

代码

import csv
import os
import glob
from pathlib import Path
from tkinter import Tk, filedialog

def merge_csv_files(csv_file_paths):
    """
    合并用户选择的多个表头相同的CSV文件。

    Args:
        csv_file_paths (list): 包含所有待合并CSV文件绝对路径的列表。

    Returns:
        bool: 如果合并成功返回 True,否则返回 False。
    """
    if not csv_file_paths:
        print("错误:没有选择任何CSV文件进行合并。")
        return False

    print(f"准备合并以下CSV文件: {csv_file_paths}")

    headers_map = {}  # 用于存储每个文件的表头: {filepath: header_tuple}
    row_counts = {}   # 用于存储每个文件的数据行数: {basename(filepath): count}
    all_data_rows = [] # 用于存储所有文件的数据(不包括表头)
    unified_header = None

    # 第一次遍历:检查表头一致性并统计行数
    for file_path in csv_file_paths:
        try:
            with open(file_path, 'r', newline='', encoding='utf-8-sig') as csvfile:
                reader = csv.reader(csvfile)
                current_header = tuple(next(reader)) # 将表头转为元组以便比较和作为字典键
                headers_map[file_path] = current_header
                
                data_rows_count = 0
                for row in reader:
                    data_rows_count += 1
                row_counts[os.path.basename(file_path)] = data_rows_count
        except StopIteration: # 空文件处理
            print(f"警告:文件 {os.path.basename(file_path)} 是空的,将跳过。")
            row_counts[os.path.basename(file_path)] = 0
            headers_map[file_path] = tuple() # 添加一个空元组作为表头
            continue
        except Exception as e:
            print(f"读取文件 {os.path.basename(file_path)} 时发生错误: {e}")
            return False

    # 检查表头是否都一致
    if not headers_map: # 如果所有选择的文件都是空的或无法读取
        print("错误:未能从所选文件中读取任何有效的表头信息。")
        return False

    # 以第一个有效文件的表头为基准
    first_valid_header = None
    first_valid_file_path = None
    for file_path, header in headers_map.items():
        if header: # 找到第一个非空表头
            first_valid_header = header
            first_valid_file_path = file_path
            break
    
    if first_valid_header is None and any(headers_map.values()): # 所有文件都是空的但至少有一个被处理过
         print("警告:所有选择的CSV文件都是空的(只有表头或完全为空)。将尝试使用第一个文件的(空)表头。")
         # 在这种情况下,如果用户仍想合并(例如,合并多个空数据文件但有相同表头的文件),
         # 可以考虑允许,或者直接报错。目前,如果第一个文件是空的,下面的逻辑会取空表头。
         # 如果headers_map中所有表头都是空元组,它们会被视作一致。
         # 如果某些是空,某些非空,则会报错。

    if not first_valid_header and not any(h for h in headers_map.values() if h): # 如果所有文件都是空的
        print("错误:所有选择的CSV文件都是空的或没有表头。")
        return False


    for file_path, header in headers_map.items():
        if row_counts.get(os.path.basename(file_path), -1) == 0 and not header : # 跳过完全为空且没有记录表头的文件
            continue
        if header != first_valid_header:
            print("\n错误:CSV文件表头不一致,无法合并!")
            print(f"基准表头 (来自文件 '{os.path.basename(first_valid_file_path)}'): {list(first_valid_header) if first_valid_header else '无表头'}")
            print(f"文件 '{os.path.basename(file_path)}' 的表头: {list(header) if header else '无表头'}")
            return False
            
    unified_header = list(first_valid_header) if first_valid_header else []
    print("\n所有有效CSV文件的表头一致。")

    # 第二次遍历:读取数据行
    total_merged_rows = 0
    for file_path in csv_file_paths:
        if row_counts.get(os.path.basename(file_path), 0) == 0 and not headers_map.get(file_path): #再次跳过空文件
             continue
        try:
            with open(file_path, 'r', newline='', encoding='utf-8-sig') as csvfile:
                reader = csv.reader(csvfile)
                next(reader) # 跳过表头
                for row in reader:
                    all_data_rows.append(row)
                    total_merged_rows +=1
        except StopIteration: # 文件只有表头
            continue
        except Exception as e:
            print(f"合并文件 {os.path.basename(file_path)} 数据时发生错误: {e}")
            return False

    # 提示用户选择保存位置和文件名
    print("\n请选择合并后文件的保存位置和名称...")
    output_initial_dir = os.path.dirname(csv_file_paths[0]) if csv_file_paths else os.getcwd()
    output_path = filedialog.asksaveasfilename(
        title="保存合并后的CSV文件",
        initialdir=output_initial_dir,
        initialfile="merged_output.csv",
        defaultextension=".csv",
        filetypes=[("CSV files", "*.csv"), ("All files", "*.*")]
    )

    if not output_path:
        print("操作已取消:未选择保存位置。合并后的文件未保存。")
        return False

    # 写入合并后的CSV文件
    try:
        with open(output_path, 'w', newline='', encoding='utf-8') as csvfile:
            writer = csv.writer(csvfile)
            if unified_header: # 确保表头不是空的 (除非所有文件都没有表头)
                writer.writerow(unified_header)
            writer.writerows(all_data_rows)
    except Exception as e:
        print(f"写入合并文件 {os.path.basename(output_path)} 时发生错误: {e}")
        return False

    print("\n--- 文件行数统计 (数据行,不含表头) ---")
    for filename, count in row_counts.items():
        print(f"文件 '{filename}': {count} 行数据")

    print(f"\n成功合并 {len(csv_file_paths)} 个CSV文件到 '{output_path}'")
    print(f"最终合并后的CSV文件 '{os.path.basename(output_path)}' 包含表头和 {total_merged_rows} 行数据。")
    return True

if __name__ == "__main__":
    # 创建一个隐藏的Tkinter主窗口,因为我们只需要对话框
    root = Tk()
    root.withdraw()

    csv_files_to_process = []
    choice = input("你想选择 (F)单个或多个CSV文件 还是 (D)一个包含CSV文件的文件夹? [F/D]: ").strip().upper()

    if choice == 'F':
        print("请在弹出的对话框中选择一个或多个CSV文件...")
        # 使用 askopenfilenames允许多选
        selected_files = filedialog.askopenfilenames(
            title="选择要合并的CSV文件",
            filetypes=[("CSV files", "*.csv"), ("All files", "*.*")]
        )
        if selected_files: # askopenfilenames 返回一个元组
            csv_files_to_process = list(selected_files)
        else:
            print("没有选择任何文件。")
    elif choice == 'D':
        print("请在弹出的对话框中选择一个包含CSV文件的文件夹...")
        selected_folder = filedialog.askdirectory(
            title="选择包含CSV文件的文件夹"
        )
        if selected_folder:
            # 查找文件夹下所有的csv文件
            csv_files_to_process = glob.glob(os.path.join(selected_folder, "*.csv"))
            if not csv_files_to_process:
                print(f"在文件夹 '{selected_folder}' 中没有找到任何CSV文件。")
        else:
            print("没有选择任何文件夹。")
    else:
        print("无效的选择。程序将退出。")

    if csv_files_to_process:
        merge_csv_files(csv_files_to_process)
    else:
        print("没有要处理的CSV文件。程序退出。")

    # 显式销毁Tk根窗口(虽然通常在脚本结束时会自动处理)
    root.destroy()